/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.extensions.sql.impl;



import com.bff.gaia.unified.sdk.extensions.sql.impl.planner.UnifiedRuleSets;

import com.bff.gaia.unified.sdk.extensions.sql.impl.rel.UnifiedLogicalConvention;

import com.bff.gaia.unified.sdk.extensions.sql.impl.rel.UnifiedRelNode;

import com.bff.gaia.unified.vendor.guava.com.google.common.collect.ImmutableList;

import org.apache.calcite.config.CalciteConnectionConfig;

import org.apache.calcite.jdbc.CalciteSchema;

import org.apache.calcite.plan.*;

import org.apache.calcite.plan.RelOptPlanner.CannotPlanException;

import org.apache.calcite.prepare.CalciteCatalogReader;

import org.apache.calcite.rel.RelRoot;

import org.apache.calcite.schema.SchemaPlus;

import org.apache.calcite.sql.SqlNode;

import org.apache.calcite.sql.SqlOperatorTable;

import org.apache.calcite.sql.fun.SqlStdOperatorTable;

import org.apache.calcite.sql.parser.SqlParseException;

import org.apache.calcite.sql.parser.SqlParser;

import org.apache.calcite.sql.parser.SqlParserImplFactory;

import org.apache.calcite.sql.util.ChainedSqlOperatorTable;

import org.apache.calcite.tools.*;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



/**

 * The core component to handle through a SQL statement, from explain execution plan, to generate a

 * Unified pipeline.

 */

class CalciteQueryPlanner implements QueryPlanner {

  private static final Logger LOG = LoggerFactory.getLogger(CalciteQueryPlanner.class);



  private final Planner planner;



  CalciteQueryPlanner(JdbcConnection connection) {

    planner = Frameworks.getPlanner(defaultConfig(connection));

  }



  public FrameworkConfig defaultConfig(JdbcConnection connection) {

    final CalciteConnectionConfig config = connection.config();

    final SqlParser.ConfigBuilder parserConfig =

        SqlParser.configBuilder()

            .setQuotedCasing(config.quotedCasing())

            .setUnquotedCasing(config.unquotedCasing())

            .setQuoting(config.quoting())

            .setConformance(config.conformance())

            .setCaseSensitive(config.caseSensitive());

    final SqlParserImplFactory parserFactory =

        config.parserFactory(SqlParserImplFactory.class, null);

    if (parserFactory != null) {

      parserConfig.setParserFactory(parserFactory);

    }



    final SchemaPlus schema = connection.getRootSchema();

    final SchemaPlus defaultSchema = connection.getCurrentSchemaPlus();



    final ImmutableList<RelTraitDef> traitDefs = ImmutableList.of(ConventionTraitDef.INSTANCE);



    final CalciteCatalogReader catalogReader =

        new CalciteCatalogReader(

            CalciteSchema.from(schema),

            ImmutableList.of(defaultSchema.getName()),

            connection.getTypeFactory(),

            connection.config());

    final SqlOperatorTable opTab0 =

        connection.config().fun(SqlOperatorTable.class, SqlStdOperatorTable.instance());



    return Frameworks.newConfigBuilder()

        .parserConfig(parserConfig.build())

        .defaultSchema(defaultSchema)

        .traitDefs(traitDefs)

        .context(Contexts.of(connection.config()))

        .ruleSets(UnifiedRuleSets.getRuleSets())

        .costFactory(null)

        .typeSystem(connection.getTypeFactory().getTypeSystem())

        .operatorTable(ChainedSqlOperatorTable.of(opTab0, catalogReader))

        .build();

  }



  /** Parse input SQL query, and return a {@link SqlNode} as grammar tree. */

  @Override

  public SqlNode parse(String sqlStatement) throws ParseException {

    SqlNode parsed;

    try {

      parsed = planner.parse(sqlStatement);

    } catch (SqlParseException e) {

      throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);

    } finally {

      planner.close();

    }

    return parsed;

  }



  /** It parses and validate the input query, then convert into a {@link UnifiedRelNode} tree. */

  @Override

  public UnifiedRelNode convertToUnifiedRel(String sqlStatement)

      throws ParseException, SqlConversionException {

    UnifiedRelNode unifiedRelNode;

    try {

      SqlNode parsed = planner.parse(sqlStatement);

      SqlNode validated = planner.validate(parsed);

      LOG.info("SQL:\n" + validated);



      // root of original logical plan

      RelRoot root = planner.rel(validated);

      LOG.info("SQLPlan>\n" + RelOptUtil.toString(root.rel));



      RelTraitSet desiredTraits =

          root.rel

              .getTraitSet()

              .replace(UnifiedLogicalConvention.INSTANCE)

              .replace(root.collation)

              .simplify();



      // unified physical plan

      unifiedRelNode = (UnifiedRelNode) planner.transform(0, desiredTraits, root.rel);

      LOG.info("BEAMPlan>\n" + RelOptUtil.toString(unifiedRelNode));

    } catch (RelConversionException | CannotPlanException e) {

      throw new SqlConversionException(

          String.format("Unable to convert query %s", sqlStatement), e);

    } catch (SqlParseException | ValidationException e) {

      throw new ParseException(String.format("Unable to parse query %s", sqlStatement), e);

    } finally {

      planner.close();

    }

    return unifiedRelNode;

  }

}